<a href="https://colab.research.google.com/github/dalgual/bigdai.github.io/blob/main/airbnbPrice_xgboost_ray_v2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
from time import time
from decimal import Decimal
import pyspark.pandas as ps
    
# XGBoost on ray is needed to run this example.
# Please refer to https://docs.ray.io/en/latest/xgboost-ray.html to install it.
import ray
from xgboost_ray import RayXGBClassifier, RayDMatrix, train, RayParams, predict
import raydp
from raydp.utils import random_split
from raydp.spark import RayMLDataset
#from ray import tune

# data_process.py at the same directory
#from data_process import nyc_taxi_preprocess, NYC_TRAIN_CSV
import dask.dataframe as dd
from dask.diagnostics import ProgressBar

In [None]:
# Import Spark SQL and Spark ML libraries
import pyspark

from pyspark.sql.types import *
from pyspark.sql.functions import *

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler,StringIndexer, VectorIndexer, MinMaxScaler
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, TrainValidationSplit
from pyspark.ml.evaluation import RegressionEvaluator, BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression,DecisionTreeClassifier

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

In [None]:
def with_benchmark(phrase, action):
    start = time()
    result = action()
    end = time()
    print('{} takes {} seconds'.format(phrase, (end - start))) #round(end - start, 2)))
    return result    

In [None]:
# Auxiliar functions
def equivalent_type(f):
    if f == 'datetime64[ns]': return TimestampType()
    elif f == 'int64': return LongType()
    elif f == 'int32': return IntegerType()
    elif f == 'float64': return FloatType()
    else: return StringType()



In [None]:
def define_structure(string, format_type):
    try: typo = equivalent_type(format_type)
    except: typo = StringType()
    return StructField(string, typo)



In [None]:
# Given pandas dataframe, it will return a spark's dataframe.
# Performance: 100MB/10 partitions
def pandas_to_spark(pandas_df, num_partition):
    columns = list(pandas_df.columns)
    types = list(pandas_df.dtypes)
    struct_list = []
    for column, typo in zip(columns, types): 
      struct_list.append(define_structure(column, typo))
    p_schema = StructType(struct_list)
    return spark.createDataFrame(pandas_df, p_schema).repartition(num_partition)
    #return sqlContext.createDataFrame(pandas_df, p_schema)

In [None]:
# shutdown before connect to ray cluster
# ray.shutdown()
# ray.init(address='auto')

# for the host/master node only
# ray.init(num_cpus=2)

# for the existing cluster
ray.init(address='auto')


2022-04-08 00:09:20,033	INFO worker.py:861 -- Connecting to existing Ray cluster at address: 172.30.0.26:6379


{'node_ip_address': '172.30.0.26',
 'raylet_ip_address': '172.30.0.26',
 'redis_address': None,
 'object_store_address': '/tmp/ray/session_2022-04-07_19-01-43_954995_4123/sockets/plasma_store',
 'raylet_socket_name': '/tmp/ray/session_2022-04-07_19-01-43_954995_4123/sockets/raylet',
 'webui_url': '127.0.0.1:8265',
 'session_dir': '/tmp/ray/session_2022-04-07_19-01-43_954995_4123',
 'metrics_export_port': 38615,
 'gcs_address': '172.30.0.26:6379',
 'address': '172.30.0.26:6379',
 'node_id': '88951fc54095205b29f9c860414d6511bd041dac8fba944047078d03'}

In [None]:
# After ray.init, you can use the raydp api to get a spark session
#g4dn.2xlarge, 1 GPU, 8 vCPUs, 32 GiB of memory, 225 NVMe SSD, up to 25 Gbps network performance
app_name = "Airbnb Rating with RayDP"
num_executors = 1 #3 #1  3 min_wokers, 5 min_wokers,
cores_per_executor = 1 #4, 2, 1
memory_per_executor = "2GB" #"6GB" # "2GB" "1GB" "500M"

In [None]:
spark = raydp.init_spark(app_name, num_executors, cores_per_executor, memory_per_executor)


[2m[36m(RayDPSparkMaster pid=14228)[0m 2022-04-08 00:09:23,276 WARN NativeCodeLoader [Thread-2]: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[2m[36m(RayDPSparkMaster pid=14228)[0m 2022-04-08 00:09:23,330 INFO SecurityManager [Thread-2]: Changing view acls to: ubuntu
[2m[36m(RayDPSparkMaster pid=14228)[0m 2022-04-08 00:09:23,330 INFO SecurityManager [Thread-2]: Changing modify acls to: ubuntu
[2m[36m(RayDPSparkMaster pid=14228)[0m 2022-04-08 00:09:23,331 INFO SecurityManager [Thread-2]: Changing view acls groups to: 
[2m[36m(RayDPSparkMaster pid=14228)[0m 2022-04-08 00:09:23,331 INFO SecurityManager [Thread-2]: Changing modify acls groups to: 
[2m[36m(RayDPSparkMaster pid=14228)[0m 2022-04-08 00:09:23,332 INFO SecurityManager [Thread-2]: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(ubuntu); groups with view permissions: Set(); users  with modify permissions: Set(u

In [None]:
start = time()
    
IS_S3 = False #True False 
file_name = 'airbnb_listings.csv' #'airbnb-listings.csv' 'airbnb_US.csv' #'airbnb_sample.csv' 'airbnb-listings.csv'
if IS_S3:
    #aws s3
    file_location = "s3://bigdai-pub/" + file_name
    phrase = "S3 Data Read Time: "
    #file_location = "s3://bigdai-pub/airbnb_US.csv"
    #file_s3 = "s3://bigdai-pub/airbnb-listings.csv"
else:
    phrase = "Local Data Read Time: "
    # local at the same directory of this code
    file_location = "./" + file_name

In [None]:
import pandas as pd
import pyspark.pandas as ps

if IS_S3:
    '''
    data = ray.data.read_csv(file_location).option("header", "true") \
        .option("inferSchema", "true").option("strings_can_be_null", "true") # (NYC_TRAIN_CSV)
'''    
    # [10] col_types: NULL (the default) to infer types from the data.
    #data = ray.data.read_csv(file_location)
    #data = ray.data.read_csv(file_location).option(arrow_csv_args: {"strings_can_be_null": True})
    #data = ray.data.read_csv(file_location).option({"strings_can_be_null": True})
    
    f1 = file_location
    #file_location = "s3://bigdai-pub/splits"
    #f1=file_location+"/airbnb_US_x00.csv"
    #f2=file_location+"/airbnb_US_x01.csv"
    #f3=file_location+"/airbnb_US_x02.csv"
    #pandasDF = pd.read_csv([f1, f2, f3], on_bad_lines='skip') #, na_filter=False) #, na_values="null") #
    # panda to read
    pandasDF = pd.read_csv(f1, on_bad_lines='skip') #, na_filter=False) #, na_values="null") #
    csv=pandas_to_spark(pandasDF, 10) #spark.createDataFrame(pandasDF) 
    
    # dask to read
    '''ddf = dd.read_csv(f1, on_bad_lines='skip', dtype=dtypes)
    #data = pandas_to_dask(ddf) 
    ddf.compute(scheduler='processes')'''

    # Pyspark Read: Hadoop no class error
    '''file_location = "s3a://bigdai-pub/splits"
    f1=file_location+"/airbnb_US_x00.csv"
    data = ps.read_csv(f1, on_bad_lines='skip')'''
    
    #     # Pyspark Read: Hadoop no class error
    '''data = spark.read.format("csv").option("header", "true") \
        .option("inferSchema", "true") \
        .load(file_location)'''
else: # airbnb-listing.csv # comment if for airbnb_US.csv and airbnb_sample.csv
    csv = spark.read.format("csv").option("header", "true") \
        .option("inferSchema", "true") \
        .option("delimiter", ";") \
        .load(file_location)

end = time()
print('{} takes {} seconds'.format(phrase, (end - start))) #round(end - start, 2)))


[2m[33m(raylet)[0m log4j:WARN No appenders could be found for logger (org.apache.htrace.core.Tracer).
[2m[33m(raylet)[0m log4j:WARN Please initialize the log4j system properly.
[2m[33m(raylet)[0m log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.


Local Data Read Time:  takes 17.040000200271606 seconds


In [None]:

#data.show(10)
csv.printSchema()

root
 |-- id: string (nullable = true)
 |-- listing_url: string (nullable = true)
 |-- scrape_id: string (nullable = true)
 |-- last_scraped: string (nullable = true)
 |-- name: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- space: string (nullable = true)
 |-- description: string (nullable = true)
 |-- experiences_offered: string (nullable = true)
 |-- neighborhood_overview: string (nullable = true)
 |-- notes: string (nullable = true)
 |-- transit: string (nullable = true)
 |-- access: string (nullable = true)
 |-- interaction: string (nullable = true)
 |-- house_rules: string (nullable = true)
 |-- thumbnail_url: string (nullable = true)
 |-- medium_url: string (nullable = true)
 |-- picture_url: string (nullable = true)
 |-- xl_picture_url: string (nullable = true)
 |-- host_id: string (nullable = true)
 |-- host_url: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- host_since: string (nullable = true)
 |-- host_location: string (nullable

In [None]:
# Set spark timezone for processing datetime
spark.conf.set("spark.sql.session.timeZone", "UTC")

In [None]:
# Create a view or table
temp_table_name = "airbnb_sample_csv"
'''df = data
df.createOrReplaceTempView(temp_table_name)
'''

'df = data\ndf.createOrReplaceTempView(temp_table_name)\n'

In [None]:
phrase= "data engineering time: "
start = time()

In [None]:
# jwoo
#csv = spark.sql("SELECT * FROM airbnb_sample_csv")
#airbnb_sample.csv
#csv=data
# airibnb_US has entire columns while we need only the following column
# csv = data.select("Review Scores Rating", "Host Listings Count", "Host Total Listings Count", "Calculated host listings count", "Security Deposit", "Cleaning Fee" , "Host Response Time","Host Response Rate","Host Acceptance Rate","Property Type","Room Type","Price","Weekly Price","Monthly Price","Maximum Nights","Review Scores Accuracy","Review Scores Cleanliness","Review Scores Checkin","Review Scores Communication","Review Scores Location","Review Scores Value","Cancellation Policy","Bedrooms","Bathrooms","Beds","Extra People","Minimum Nights")

#csv.show(5)

In [None]:

csv = csv.withColumn("review_scores_rating", when(col("review_scores_rating") >= 80,1).otherwise(0))
csv = csv.withColumn("host_response_rate", csv["host_response_rate"].cast(IntegerType()))
csv = csv.withColumn("host_listings_count", csv["host_listings_count"].cast(IntegerType()))
csv = csv.withColumn("host_total_listings_count", csv["host_total_listings_count"].cast(IntegerType()))
csv = csv.withColumn("price", csv["price"].cast(IntegerType()))
csv = csv.withColumn("weekly_price", csv["weekly_price"].cast(IntegerType()))
csv = csv.withColumn("monthly_price", csv["monthly_price"].cast(IntegerType()))

csv = csv.withColumn("maximum_nights", csv["maximum_nights"].cast(IntegerType()))
csv = csv.withColumn("review_scores_accuracy", csv["review_scores_accuracy"].cast(IntegerType()))
csv = csv.withColumn("review_scores_cleanliness", csv["review_scores_cleanliness"].cast(IntegerType()))
csv = csv.withColumn("review_scores_checkin", csv["review_scores_checkin"].cast(IntegerType()))
csv = csv.withColumn("review_scores_communication", csv["review_scores_communication"].cast(IntegerType()))
csv = csv.withColumn("review_scores_location", csv["review_scores_location"].cast(IntegerType()))

csv = csv.withColumn("review_scores_value", csv["review_scores_value"].cast(IntegerType()))
csv = csv.withColumn("calculated_host_listings_count", csv["calculated_host_listings_count"].cast(IntegerType()))
csv = csv.withColumn("bedrooms", csv["bedrooms"].cast(IntegerType()))
csv = csv.withColumn("bathrooms", csv["bathrooms"].cast(IntegerType()))
csv = csv.withColumn("beds", csv["beds"].cast(IntegerType()))
csv = csv.withColumn("security_deposit", csv["security_deposit"].cast(IntegerType()))

csv = csv.withColumn("host_acceptance_rate", csv["host_acceptance_rate"].cast(IntegerType()))
csv = csv.withColumn("cleaning_fee", csv["cleaning_fee"].cast(IntegerType()))
csv = csv.withColumn("extra_people", csv["extra_people"].cast(IntegerType()))
csv = csv.withColumn("minimum_nights", csv["minimum_nights"].cast(IntegerType()))

csv.show(5)


+--------+--------------------+--------------+------------+--------------------+--------------------+--------------------+--------------------+-------------------+---------------------+-----+--------------------+--------------------+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------+--------------------+---------+----------+--------------------+--------------------+------------------+------------------+--------------------+--------------------+--------------------+------------------+-------------------+-------------------------+--------------------+--------------------+-------------+----------------------+----------------------------+-------------+--------------------+-------+----------+--------------------+------------+-------+------------------+------------------+-------------+---------------+------------+---------+--------+----+--------+--------------------+-----------+-----+------------+-------------+-------

In [None]:
csv = csv.filter(col("minimum_nights")<= 365)

#data = csv.select("Host Response Time","Host Response Rate","Host Acceptance Rate","Host Neighborhood","Host Listings Count","Host Total Listings Count","Property Type","Room Type","Price","Weekly Price","Monthly Price","Maximum Nights","Review Scores Accuracy","Review Scores Cleanliness","Review Scores Checkin","Review Scores Communication","Review Scores Location","Review Scores Value","Cancellation Policy","Calculated host listings count","Neighborhood Cleansed","Neighborhood Group Cleansed","Bedrooms","Bathrooms","Beds","Security Deposit","Cleaning Fee","Extra People","Minimum Nights","Calendar Updated","Amenities", col("Review Scores Rating").alias("label"))

#data = csv.select("Host Response Time","Host Response Rate","Host Acceptance Rate","Host Listings Count","Host Total Listings Count","Property Type","Room Type","Price","Weekly Price","Monthly Price","Maximum Nights","Review Scores Accuracy","Review Scores Cleanliness","Review Scores Checkin","Review Scores Communication","Review Scores Location","Review Scores Value","Cancellation Policy","Calculated host listings count","Bedrooms","Bathrooms","Beds","Security Deposit","Cleaning Fee","Extra People","Minimum Nights","Calendar Updated","Amenities", col("Review Scores Rating").alias("label"))

#data = csv.select("Host Response Time","Host Response Rate","Host Acceptance Rate","Property Type","Room Type","Price","Weekly Price","Monthly Price","Maximum Nights","Review Scores Accuracy","Review Scores Cleanliness","Review Scores Checkin","Review Scores Communication","Review Scores Location","Review Scores Value","Cancellation Policy","Bedrooms","Bathrooms","Beds","Extra People","Minimum Nights", col("Review Scores Rating").alias("label"))
data = csv.select("review_scores_rating", "host_listings_count", "host_total_listings_count", "calculated_host_listings_count", "security_deposit", "cleaning_fee" , "host_response_time","host_response_rate","host_acceptance_rate","property_type","room_type","bed_type", "weekly_price","monthly_price","maximum_nights","review_scores_accuracy","review_scores_cleanliness","review_scores_checkin","review_scores_communication","review_scores_location","review_scores_value","cancellation_policy","bedrooms","bathrooms","beds","extra_people","minimum_nights",col("price").cast("Int").alias("label"))

#csv.show(5)
data.show(5)

+--------------------+-------------------+-------------------------+------------------------------+----------------+------------+------------------+------------------+--------------------+-------------+---------------+--------+------------+-------------+--------------+----------------------+-------------------------+---------------------+---------------------------+----------------------+-------------------+-------------------+--------+---------+----+------------+--------------+-----+
|review_scores_rating|host_listings_count|host_total_listings_count|calculated_host_listings_count|security_deposit|cleaning_fee|host_response_time|host_response_rate|host_acceptance_rate|property_type|      room_type|bed_type|weekly_price|monthly_price|maximum_nights|review_scores_accuracy|review_scores_cleanliness|review_scores_checkin|review_scores_communication|review_scores_location|review_scores_value|cancellation_policy|bedrooms|bathrooms|beds|extra_people|minimum_nights|label|
+-------------------

In [None]:
# Filter Property Type not in the correct list
property_list = ["Apartment","House","Bed & Breakfast","Condominium","Loft", "Townhouse","Other","Villa", "Guesthouse", "Bungalow", "Dorm", "Boat", "Cabin", "Chalet", "Boutique hotel", "Serviced apartment", "Hostel", "Camper/RV", "Timeshare", "Guest suite", "Tent", "Vacation home", "Castle, Treehouse", "In-law", "Earth House", "Hut", "Yurt", "Entire Floor", "Tipi", "Nature lodge", "Cave", "Lighthouse", "Casa particular", "Train", "Island", "Igloo", "Parking Space", "Pension (Korea)", "Ryokan (Japan)", "Car", "Heritage hotel (India)", "Plane", "Van"
]

data = data.filter(data.property_type.isin(property_list))

In [None]:
#csv.persist(pyspark.StorageLevel.MEMORY_AND_DISK_2)
#csv.persist(pyspark.StorageLevel.OFF_HEAP)
data.persist(pyspark.StorageLevel.DISK_ONLY_2)

DataFrame[review_scores_rating: int, host_listings_count: int, host_total_listings_count: int, calculated_host_listings_count: int, security_deposit: int, cleaning_fee: int, host_response_time: string, host_response_rate: int, host_acceptance_rate: int, property_type: string, room_type: string, bed_type: string, weekly_price: int, monthly_price: int, maximum_nights: int, review_scores_accuracy: int, review_scores_cleanliness: int, review_scores_checkin: int, review_scores_communication: int, review_scores_location: int, review_scores_value: int, cancellation_policy: string, bedrooms: int, bathrooms: int, beds: int, extra_people: int, minimum_nights: int, label: int]


## Data Cleaning
**Handling Missing Values:** Filling the missing values of numeric columns with **'0'** and string columns with **'NA'**

In [None]:
outliers = data.stat.approxQuantile(["bathrooms","bedrooms","monthly_price","extra_people","minimum_nights","label"], [0.05,0.95],0.0)
 
print(outliers)
 
#Filtering the dataframe by removing the outliers
#  data1 = data.filter(col("Host Listings Count") >= outliers(0)(0) && col("Host Listings Count")  <= outliers(0)(1))
#  data2 = data.filter(col("Host Total Listings Count") >= outliers(1)(0) && col("Host Total Listings Count")  <= outliers(1)(1))
# data3 = data.filter(data["Accommodates"] >= outliers[0][0] and data["Accommodates"]  <= outliers[0][1])
data4 = data.filter((data["bathrooms"] >= outliers[0][0]) & (data["bathrooms"]  <= outliers[0][1]))
data5 = data4.filter((data4["bedrooms"] >= outliers[1][0]) & (data4["bedrooms"]  <= outliers[1][1]))
data6 = data5.filter((data5["monthly_price"] >= outliers[2][0]) & (data5["monthly_price"]  <= outliers[2][1]))
# data7 = data6.filter(data6["Cleaning Fee"] >= outliers[4][0] and data6["Cleaning Fee"]  <= outliers[4][1])
#data8 = data7.filter(data7["Guests Included"] >= outliers[5][0]) and data7["Guests Included"]  <= outliers[5][1])
data9 = data6.filter((data6["extra_people"] >= outliers[3][0]) & (data6["extra_people"]  <= outliers[3][1]))
data10 = data9.filter((data9["minimum_nights"] >= outliers[4][0]) & (data9["minimum_nights"]  <= outliers[4][1]))

final_data = data10.filter((data10["label"] >= outliers[5][0]) & (data10["label"]  <= outliers[5][1]))
 
final_data.show(30)


[[1.0, 2.0], [0.0, 3.0], [600.0, 8950.0], [0.0, 50.0], [1.0, 7.0], [29.0, 495.0]]
+--------------------+-------------------+-------------------------+------------------------------+----------------+------------+------------------+------------------+--------------------+-------------+---------------+--------+------------+-------------+--------------+----------------------+-------------------------+---------------------+---------------------------+----------------------+-------------------+-------------------+--------+---------+----+------------+--------------+-----+
|review_scores_rating|host_listings_count|host_total_listings_count|calculated_host_listings_count|security_deposit|cleaning_fee|host_response_time|host_response_rate|host_acceptance_rate|property_type|      room_type|bed_type|weekly_price|monthly_price|maximum_nights|review_scores_accuracy|review_scores_cleanliness|review_scores_checkin|review_scores_communication|review_scores_location|review_scores_value|cancellation_poli

In [None]:
#data_clean = data.na.fill(value=0).na.fill("")
# jwoo
# data_clean = data.na.fill(value=0).na.fill("NA")

#data_clean = data.na.fill(value=0)
data_clean = final_data.na.fill(value=0).na.fill("NA")

data_clean.show(5)

+--------------------+-------------------+-------------------------+------------------------------+----------------+------------+------------------+------------------+--------------------+-------------+---------------+--------+------------+-------------+--------------+----------------------+-------------------------+---------------------+---------------------------+----------------------+-------------------+-------------------+--------+---------+----+------------+--------------+-----+
|review_scores_rating|host_listings_count|host_total_listings_count|calculated_host_listings_count|security_deposit|cleaning_fee|host_response_time|host_response_rate|host_acceptance_rate|property_type|      room_type|bed_type|weekly_price|monthly_price|maximum_nights|review_scores_accuracy|review_scores_cleanliness|review_scores_checkin|review_scores_communication|review_scores_location|review_scores_value|cancellation_policy|bedrooms|bathrooms|beds|extra_people|minimum_nights|label|
+-------------------

In [None]:
# jwoo: add .setHandleInvalid("skip"): or "keep" for null value
data_clean = StringIndexer(inputCol='host_response_time', outputCol='host_response_time_index').setHandleInvalid("skip").fit(data_clean).transform(data_clean)

data_clean = StringIndexer(inputCol='cancellation_policy', outputCol='cancellation_policy_index').setHandleInvalid("skip").fit(data_clean).transform(data_clean)

data_clean = StringIndexer(inputCol='property_type', outputCol='property_type_index').setHandleInvalid("keep").fit(data_clean).transform(data_clean)
data_clean= StringIndexer(inputCol='room_type', outputCol='room_type_index').setHandleInvalid("keep").fit(data_clean).transform(data_clean)
#data_clean = StringIndexer(inputCol='Bed Type', outputCol='BedType_index').setHandleInvalid("keep").fit(data_clean).transform(data_clean)
#jwoo
data_clean = StringIndexer(inputCol='bed_type', outputCol='bed_type_index').setHandleInvalid("keep").fit(data_clean).transform(data_clean)
#data_clean = StringIndexer(inputCol='Host Listings Count', outputCol='Host_Listings_Count_index').setHandleInvalid("keep").fit(data_clean).transform(data_clean)

data_clean = StringIndexer(inputCol="review_scores_rating", outputCol='review_scores_rating_index').fit(data_clean).transform(data_clean)

data_clean.show(5)

+--------------------+-------------------+-------------------------+------------------------------+----------------+------------+------------------+------------------+--------------------+-------------+---------------+--------+------------+-------------+--------------+----------------------+-------------------------+---------------------+---------------------------+----------------------+-------------------+-------------------+--------+---------+----+------------+--------------+-----+------------------------+-------------------------+-------------------+---------------+--------------+--------------------------+
|review_scores_rating|host_listings_count|host_total_listings_count|calculated_host_listings_count|security_deposit|cleaning_fee|host_response_time|host_response_rate|host_acceptance_rate|property_type|      room_type|bed_type|weekly_price|monthly_price|maximum_nights|review_scores_accuracy|review_scores_cleanliness|review_scores_checkin|review_scores_communication|review_scores

In [None]:
#jwoo: drop the unneccessary columns as index value generated from them
final_df = data_clean.drop('host_response_time', "property_type", "room_type", "bed_type", "cancellation_policy", "review_scores_rating")
final_df.show(5)

+-------------------+-------------------------+------------------------------+----------------+------------+------------------+--------------------+------------+-------------+--------------+----------------------+-------------------------+---------------------+---------------------------+----------------------+-------------------+--------+---------+----+------------+--------------+-----+------------------------+-------------------------+-------------------+---------------+--------------+--------------------------+
|host_listings_count|host_total_listings_count|calculated_host_listings_count|security_deposit|cleaning_fee|host_response_rate|host_acceptance_rate|weekly_price|monthly_price|maximum_nights|review_scores_accuracy|review_scores_cleanliness|review_scores_checkin|review_scores_communication|review_scores_location|review_scores_value|bedrooms|bathrooms|beds|extra_people|minimum_nights|label|host_response_time_index|cancellation_policy_index|property_type_index|room_type_index|bed

In [None]:
final_df.printSchema()

root
 |-- host_listings_count: integer (nullable = true)
 |-- host_total_listings_count: integer (nullable = true)
 |-- calculated_host_listings_count: integer (nullable = true)
 |-- security_deposit: integer (nullable = true)
 |-- cleaning_fee: integer (nullable = true)
 |-- host_response_rate: integer (nullable = true)
 |-- host_acceptance_rate: integer (nullable = true)
 |-- weekly_price: integer (nullable = true)
 |-- monthly_price: integer (nullable = true)
 |-- maximum_nights: integer (nullable = true)
 |-- review_scores_accuracy: integer (nullable = true)
 |-- review_scores_cleanliness: integer (nullable = true)
 |-- review_scores_checkin: integer (nullable = true)
 |-- review_scores_communication: integer (nullable = true)
 |-- review_scores_location: integer (nullable = true)
 |-- review_scores_value: integer (nullable = true)
 |-- bedrooms: integer (nullable = true)
 |-- bathrooms: integer (nullable = true)
 |-- beds: integer (nullable = true)
 |-- extra_people: integer (null

In [None]:

# Split the data
splits = final_df.randomSplit([0.7, 0.3])

# for decision tree classifier
train_df = splits[0]
test_df = splits[1].withColumnRenamed("label", "trueLabel")

print ("Training Rows:", train_df.count(), " Testing Rows:", test_df.count())

Training Rows: 33330  Testing Rows: 14390


In [None]:
train_df_pandas = train_df.toPandas()
test_df_pandas = test_df.toPandas()
print(train_df_pandas)


       host_listings_count  host_total_listings_count  \
0                        0                          0   
1                        0                          0   
2                        0                          0   
3                        0                          0   
4                        1                          1   
...                    ...                        ...   
33325                   93                         93   
33326                   95                         95   
33327                  106                        106   
33328                  177                        177   
33329                  879                        879   

       calculated_host_listings_count  security_deposit  cleaning_fee  \
0                                   1                 0             0   
1                                   1                 0           100   
2                                   1               154            20   
3                      

In [None]:
# Split data into train_dataset and test_dataset
# train_df, test_df = random_split(data, [0.9, 0.1], 0)
# Convert spark dataframe into ray dataset

'''
train_dataset = ray.data.from_spark(train_df)
test_dataset = ray.data.from_spark(test_df)
'''
# Distributed Loading [7]
train_dataset = RayDMatrix(train_df, label="label", enable_categorical=True)
test_dataset = RayDMatrix(test_df, label='trueLabel', enable_categorical=True)


__ValueError__: Unknown data source type: <class 'list'> with FileType: None.
FIX THIS by passing a supported data type. Supported data types include pandas.DataFrame, pandas.Series, np.ndarray, and CSV/Parquet file paths. If you specify a file, path, consider passing the `filetype` argument to specify the type of the source. Use the `RayFileType` enum for that. If using Modin, Dask, or Petastorm, make sure the library is installed.
__error_

```
train_set = RayDMatrix(train_list, label='label', enable_categorical=True) # error: enable_categorical=True
test_set = RayDMatrix(test_list, label='trueLabel', enable_categorical=True)
```

In [None]:
# Then convert them into DMatrix used by xgboost
#  error: When
'''
(_RemoteRayXGBoostActor pid=10175) categorical type is supplied, DMatrix parameter `enable_categorical` must
(_RemoteRayXGBoostActor pid=10175) be set to `True`. Invalid columns:Host Response Time, Property Type, Room Type, Cancellation Policy
'''
'''
dtrain = RayDMatrix(train_dataset, label='label', enable_categorical=True) # error: enable_categorical=True
dtest = RayDMatrix(test_dataset, label='trueLabel', enable_categorical=True)
'''
# to follow the classifier example [1]
train_set = RayDMatrix(train_df_pandas, label='label', enable_categorical=True) # error: enable_categorical=True
test_set = RayDMatrix(test_df_pandas, label='trueLabel', enable_categorical=True)

'''
train_set = RayDMatrix(train_x, train_y, enable_categorical=True)
test_set = RayDMatrix(test_x, test_y, enable_categorical=True)'''

'\ntrain_set = RayDMatrix(train_x, train_y, enable_categorical=True)\ntest_set = RayDMatrix(test_x, test_y, enable_categorical=True)'

In [None]:
end=time()
print('{} takes {} seconds'.format(phrase, (end - start))) #round(end - start, 2)))

data engineering time:  takes 27.490825176239014 seconds


#### ValueError: DataFrame.dtypes for data must be int, float, bool or category.  
When
(RemoteRayXGBoostActor pid=1198) categorical type is supplied, DMatrix parameter `enable_categorical` must
(RemoteRayXGBoostActor pid=1198) be set to `True`. Invalid columns:__Host Response Time, Property Type, Room Type, Cancellation Policy__


In [None]:
evals_result = {}

'''
bst = train(
    {
        "objective": "binary:logistic",
        "eval_metric": ["logloss", "error"], #, "rmse", "r2"],
    },
    train_set,
    evals_result=evals_result,
    evals=[(train_set, "train"), (test_set, "test")],
    verbose_eval=False,
    ray_params=RayParams(
        num_actors=2,  # Number of remote actors
        cpus_per_actor=1))
'''

'\nbst = train(\n    {\n        "objective": "binary:logistic",\n        "eval_metric": ["logloss", "error"], #, "rmse", "r2"],\n    },\n    train_set,\n    evals_result=evals_result,\n    evals=[(train_set, "train"), (test_set, "test")],\n    verbose_eval=False,\n    ray_params=RayParams(\n        num_actors=2,  # Number of remote actors\n        cpus_per_actor=1))\n'

In [None]:
bst = with_benchmark('training time: ', lambda: train(
    {
        #"tree_method": "gpu_hist",
        # default for regression: reg:squarederror
        "objective": "reg:squarederror",
        "eval_metric": ["rmse", "rmsle"], #, "logloss", "error", "rmse", "rmsle",   # "r2": NA
    },
    train_set,
    evals_result=evals_result,
    evals=[(train_set, "train"), (test_set, "test")],
    verbose_eval=False,
    ray_params=RayParams(
        num_actors=3,  # Number of remote actors: 3, 5 : min_workers
        cpus_per_actor=1)) #gpus_per_actor=1)) # cpus_per_actor=1, ))
    )


2022-04-08 00:10:09,169	INFO main.py:985 -- [RayXGBoost] Created 3 new actors (3 total actors). Waiting until actors are ready for training.
2022-04-08 00:10:10,635	INFO main.py:1030 -- [RayXGBoost] Starting XGBoost training.
[2m[36m(_RemoteRayXGBoostActor pid=14872)[0m [00:10:10] task [xgboost.ray]:140415120672016 got new rank 0
[2m[36m(_RemoteRayXGBoostActor pid=14968)[0m [00:10:10] task [xgboost.ray]:139852450847952 got new rank 1
[2m[36m(_RemoteRayXGBoostActor pid=14969)[0m [00:10:10] task [xgboost.ray]:140653212425872 got new rank 2
2022-04-08 00:10:12,323	INFO main.py:1512 -- [RayXGBoost] Finished XGBoost training on training data with total N=33,330 in 3.84 seconds (1.68 pure XGBoost training time).


training time:  takes 4.33713960647583 seconds


In [None]:
# bst.save_model("air_model.xgb")
print("Final training rmse: {:.4f}".format(
    evals_result["train"]["rmse"][-1]))

Final training rmse: 30.0343


In [None]:
# bst.save_model("air_model.xgb")
print("Final testing rmse: {:.4f}".format(
    evals_result["test"]["rmse"][-1]))

Final testing rmse: 31.1193


In [None]:
print("Final training rmsle: {:.4f}".format(
    evals_result["train"]["rmsle"][-1]))

Final training rmsle: 0.2343


In [None]:
print("Final testing rmsle: {:.4f}".format(
    evals_result["test"]["rmsle"][-1]))

Final testing rmsle: 0.2408


In [None]:
from xgboost_ray import predict

# bst = xgb.Booster(model_file="air_model.xgb")
pred_ray = predict(bst, test_set, ray_params=RayParams(num_actors=2,  # Number of remote actors
        cpus_per_actor=1))

print(pred_ray)

2022-04-08 00:10:12,843	INFO main.py:1549 -- [RayXGBoost] Created 2 remote actors.
2022-04-08 00:10:14,229	INFO main.py:1566 -- [RayXGBoost] Starting XGBoost prediction.


[ 60.19665   43.606106  38.75093  ... 191.93571  139.84459   68.27347 ]


In [None]:
print(test_df_pandas["trueLabel"]) # RayMatrix: test_set

0         55
1         45
2         40
3         39
4         30
        ... 
14385    275
14386    175
14387    167
14388    145
14389     81
Name: trueLabel, Length: 14390, dtype: int32


In [None]:
raydp.stop_spark()
ray.shutdown()

### Experimental Result:      
#### min_workers: 2, max_workers: 3

1. Experiment 1 (header: g4dn.2xlarge): num_executors = 1, cores_per_executor = 1

|File Size| Loc | workers | no actors |Partition| no_executors| cores/exe| mem/exe| max workers| Train (s)| Read| Data Eng| Train rmse | Test rmse | Train rmsle | Test rmsle |
| :---    | :----:  | :----: | :----:   | :----:   | :----:   | :----:   | :----:   | :----:   | :----:   | :----:| :----:| :----:   | :----:   | :----:   | ---: |
|1.930GB | local |2- 8  | 3 |   NA | 1| 1| "2GB"| | 4.34 (hist)|17.04 | 27.49  | 30.03 | 31.12 |0.2343 |0.2408 |



### References
1. Ray Breast Cancer Example, https://github.com/ray-project/xgboost_ray 
1. Data Read API, https://docs.ray.io/en/releases-1.9.1/_modules/ray/data/read_api.html
1. actors, https://docs.ray.io/en/latest/ray-core/actors.html
1. https://docs.ray.io/en/ray-1.1.0/xgboost-ray.html
1. https://sparkbyexamples.com/pyspark/convert-pyspark-dataframe-to-pandas/
1. https://sparkbyexamples.com/pandas/pandas-select-columns-by-name-or-index/
1. https://github.com/ray-project/xgboost_ray#distributed-data-loading
1. https://www.anyscale.com/blog/building-an-end-to-end-ml-pipeline-using-mars-and-xgboost-on-ray
1. Evaluation Example in Dask, https://docs.ray.io/en/latest/ray-core/examples/dask_xgboost/dask_xgboost.html
1. Read a CSV or other delimited file with Arrow, https://arrow.apache.org/docs/r/reference/read_delim_arrow.html
1.  Data Science with Python and Dask, Manning Book, https://livebook.manning.com/book/data-science-at-scale-with-python-and-dask/chapter-4/21
1. XGBoost Parameters, https://xgboost.readthedocs.io/en/stable/parameter.html#parameters-for-tweedie-regression-objective-reg-tweedie
1. https://www.analyticsvidhya.com/blog/2021/05/know-the-best-evaluation-metrics-for-your-regression-model/