<a href="https://colab.research.google.com/github/dalgual/bigdai.github.io/blob/main/airbnbRating_xgboost_ray_v1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
from time import time
from decimal import Decimal

# XGBoost on ray is needed to run this example.
# Please refer to https://docs.ray.io/en/latest/xgboost-ray.html to install it.
import ray
from xgboost_ray import RayXGBClassifier, RayDMatrix, train, RayParams, predict
import raydp
from raydp.utils import random_split
from raydp.spark import RayMLDataset
#from ray import tune

# data_process.py at the same directory
#from data_process import nyc_taxi_preprocess, NYC_TRAIN_CSV



In [None]:
# Import Spark SQL and Spark ML libraries
from pyspark.sql.types import *
from pyspark.sql.functions import *

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler,StringIndexer, VectorIndexer, MinMaxScaler
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, TrainValidationSplit
from pyspark.ml.evaluation import RegressionEvaluator, BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression,DecisionTreeClassifier

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

In [None]:
def with_benchmark(phrase, action):
    start = time()
    result = action()
    end = time()
    print('{} takes {} seconds'.format(phrase, (end - start))) #round(end - start, 2)))
    return result    

In [None]:
# Auxiliar functions
def equivalent_type(f):
    if f == 'datetime64[ns]': return TimestampType()
    elif f == 'int64': return LongType()
    elif f == 'int32': return IntegerType()
    elif f == 'float64': return FloatType()
    else: return StringType()



In [None]:
def define_structure(string, format_type):
    try: typo = equivalent_type(format_type)
    except: typo = StringType()
    return StructField(string, typo)



In [None]:
# Given pandas dataframe, it will return a spark's dataframe.
# Performance: 100MB/10 partitions
def pandas_to_spark(pandas_df, num_partition):
    columns = list(pandas_df.columns)
    types = list(pandas_df.dtypes)
    struct_list = []
    for column, typo in zip(columns, types): 
      struct_list.append(define_structure(column, typo))
    p_schema = StructType(struct_list)
    return spark.createDataFrame(pandas_df, p_schema).repartition(num_partition)
    #return sqlContext.createDataFrame(pandas_df, p_schema)

In [None]:
# shutdown before connect to ray cluster
# ray.shutdown()
# ray.init(address='auto')

# for the host/master node only
# ray.init(num_cpus=2)

# for the existing cluster
ray.init(address='auto')


2022-03-28 06:25:52,179	INFO worker.py:861 -- Connecting to existing Ray cluster at address: 172.30.0.247:6379


{'node_ip_address': '172.30.0.247',
 'raylet_ip_address': '172.30.0.247',
 'redis_address': None,
 'object_store_address': '/tmp/ray/session_2022-03-28_03-50-58_169987_3837/sockets/plasma_store',
 'raylet_socket_name': '/tmp/ray/session_2022-03-28_03-50-58_169987_3837/sockets/raylet',
 'webui_url': '127.0.0.1:8265',
 'session_dir': '/tmp/ray/session_2022-03-28_03-50-58_169987_3837',
 'metrics_export_port': 59541,
 'gcs_address': '172.30.0.247:6379',
 'address': '172.30.0.247:6379',
 'node_id': '17398847d8899ba9bde83f7f40fea8521cf7fa73c7ce3e8994dc59b0'}

In [None]:
# After ray.init, you can use the raydp api to get a spark session
#g4dn.2xlarge, 1 GPU, 8 vCPUs, 32 GiB of memory, 225 NVMe SSD, up to 25 Gbps network performance
app_name = "Airbnb Rating with RayDP"
num_executors = 1 #1
cores_per_executor = 1 #4, 2, 1
memory_per_executor = "2GB" # "2GB" "1GB" "500M"

In [None]:
spark = raydp.init_spark(app_name, num_executors, cores_per_executor, memory_per_executor)


[2m[36m(RayDPSparkMaster pid=14117)[0m 2022-03-28 06:25:55,767 WARN NativeCodeLoader [Thread-2]: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[2m[36m(RayDPSparkMaster pid=14117)[0m 2022-03-28 06:25:55,851 INFO SecurityManager [Thread-2]: Changing view acls to: ubuntu
[2m[36m(RayDPSparkMaster pid=14117)[0m 2022-03-28 06:25:55,851 INFO SecurityManager [Thread-2]: Changing modify acls to: ubuntu
[2m[36m(RayDPSparkMaster pid=14117)[0m 2022-03-28 06:25:55,852 INFO SecurityManager [Thread-2]: Changing view acls groups to: 
[2m[36m(RayDPSparkMaster pid=14117)[0m 2022-03-28 06:25:55,852 INFO SecurityManager [Thread-2]: Changing modify acls groups to: 
[2m[36m(RayDPSparkMaster pid=14117)[0m 2022-03-28 06:25:55,852 INFO SecurityManager [Thread-2]: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(ubuntu); groups with view permissions: Set(); users  with modify permissions: Set(u

In [None]:
start = time()
    
IS_S3 = True #True False 
file_name = 'airbnb_US.csv' #'airbnb_sample.csv'
if IS_S3:
    #aws s3
    file_location = "s3://bigdai-pub/" + file_name
    phrase = "S3 Data Read Time: "
    #file_location = "s3://bigdai-pub/airbnb_US.csv"
    #file_s3 = "s3://bigdai-pub/airbnb-listings.csv"
else:
    phrase = "Local Data Read Time: "
    # local at the same directory of this code
    file_location = "./" + file_name

In [None]:
import pandas as pd
import pyspark.pandas as ps

if IS_S3:
    '''
    data = ray.data.read_csv(file_location).option("header", "true") \
        .option("inferSchema", "true").option("strings_can_be_null", "true") # (NYC_TRAIN_CSV)
'''    
    # [10] col_types: NULL (the default) to infer types from the data.
    #data = ray.data.read_csv(file_location)
    #data = ray.data.read_csv(file_location).option(arrow_csv_args: {"strings_can_be_null": True})
    #data = ray.data.read_csv(file_location).option({"strings_can_be_null": True})
    
    file_location = "s3://bigdai-pub/splits"
    f1=file_location+"/airbnb_US_x00.csv"
    #f2=file_location+"/airbnb_US_x01.csv"
    #f3=file_location+"/airbnb_US_x02.csv"
    #pandasDF = pd.read_csv([f1, f2, f3], on_bad_lines='skip') #, na_filter=False) #, na_values="null") #
    pandasDF = pd.read_csv(f1, on_bad_lines='skip') #, na_filter=False) #, na_values="null") #
    data=pandas_to_spark(pandasDF, 30) #spark.createDataFrame(pandasDF) 
    
    '''data = spark.read.format("csv").option("header", "true") \
        .option("inferSchema", "true") \
        .load(file_location)'''
else:
    data = spark.read.format("csv").option("header", "true") \
        .option("inferSchema", "true") \
        .load(file_location)

end = time()
print('{} takes {} seconds'.format(phrase, (end - start))) #round(end - start, 2)))


  exec(code_obj, self.user_global_ns, self.user_ns)


S3 Data Read Time:  takes 11.245431184768677 seconds


In [None]:

#data.show(10)

In [None]:
# Set spark timezone for processing datetime
spark.conf.set("spark.sql.session.timeZone", "UTC")

In [None]:
# Create a view or table
temp_table_name = "airbnb_sample_csv"
df = data
df.createOrReplaceTempView(temp_table_name)


In [None]:
# jwoo
#csv = spark.sql("SELECT * FROM airbnb_sample_csv")
csv = data
#csv = df.select("Review Scores Rating", "Host Listings Count", "Host Total Listings Count", "Calculated host listings count", "Security Deposit", "Cleaning Fee" , "Host Response Time","Host Response Rate","Host Acceptance Rate","Property Type","Room Type","Price","Weekly Price","Monthly Price","Maximum Nights","Review Scores Accuracy","Review Scores Cleanliness","Review Scores Checkin","Review Scores Communication","Review Scores Location","Review Scores Value","Cancellation Policy","Bedrooms","Bathrooms","Beds","Extra People","Minimum Nights")

#csv.show(5)

In [None]:
phrase= "data engineering time: "
start = time()

In [None]:
csv = csv.withColumn("Review Scores Rating", when(col("Review Scores Rating") >= 80,1).otherwise(0))
csv = csv.withColumn("Host Response Rate", csv["Host Response Rate"].cast(IntegerType()))
csv = csv.withColumn("Host Listings Count", csv["Host Listings Count"].cast(IntegerType()))
csv = csv.withColumn("Host Total Listings Count", csv["Host Total Listings Count"].cast(IntegerType()))
csv = csv.withColumn("Price", csv["Price"].cast(IntegerType()))
csv = csv.withColumn("Weekly Price", csv["Weekly Price"].cast(IntegerType()))
csv = csv.withColumn("Monthly Price", csv["Monthly Price"].cast(IntegerType()))

csv = csv.withColumn("Maximum Nights", csv["Maximum Nights"].cast(IntegerType()))
csv = csv.withColumn("Review Scores Accuracy", csv["Review Scores Accuracy"].cast(IntegerType()))
csv = csv.withColumn("Review Scores Cleanliness", csv["Review Scores Cleanliness"].cast(IntegerType()))
csv = csv.withColumn("Review Scores Checkin", csv["Review Scores Checkin"].cast(IntegerType()))
csv = csv.withColumn("Review Scores Communication", csv["Review Scores Communication"].cast(IntegerType()))
csv = csv.withColumn("Review Scores Location", csv["Review Scores Location"].cast(IntegerType()))

csv = csv.withColumn("Review Scores Value", csv["Review Scores Value"].cast(IntegerType()))
csv = csv.withColumn("Calculated host listings count", csv["Calculated host listings count"].cast(IntegerType()))
csv = csv.withColumn("Bedrooms", csv["Bedrooms"].cast(IntegerType()))
csv = csv.withColumn("Bathrooms", csv["Bathrooms"].cast(IntegerType()))
csv = csv.withColumn("Beds", csv["Beds"].cast(IntegerType()))
csv = csv.withColumn("Security Deposit", csv["Security Deposit"].cast(IntegerType()))

csv = csv.withColumn("Host Acceptance Rate", csv["Host Acceptance Rate"].cast(IntegerType()))
csv = csv.withColumn("Cleaning Fee", csv["Cleaning Fee"].cast(IntegerType()))
csv = csv.withColumn("Extra People", csv["Extra People"].cast(IntegerType()))
csv = csv.withColumn("Minimum Nights", csv["Minimum Nights"].cast(IntegerType()))

csv.show(5)


+--------+--------------------+--------------+------------+--------------------+--------------------+--------------------+--------------------+-------------------+---------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------+--------------------+---------+----------+--------------------+--------------------+------------------+------------------+--------------------+--------------------+--------------------+--------------------+-------------------+-------------------------+--------------------+--------------------+--------------------+----------------------+----------------------------+-------------+-----+-------+-------------+-----------------+------------+-------------+------------------+-------------------+-------------+---------------+------------+---------+--------+----+--------+--------------------+-----------+-----+------------+-----------

In [None]:
csv = csv.filter(col("Minimum Nights")<= 365)

#data = csv.select("Host Response Time","Host Response Rate","Host Acceptance Rate","Host Neighborhood","Host Listings Count","Host Total Listings Count","Property Type","Room Type","Price","Weekly Price","Monthly Price","Maximum Nights","Review Scores Accuracy","Review Scores Cleanliness","Review Scores Checkin","Review Scores Communication","Review Scores Location","Review Scores Value","Cancellation Policy","Calculated host listings count","Neighborhood Cleansed","Neighborhood Group Cleansed","Bedrooms","Bathrooms","Beds","Security Deposit","Cleaning Fee","Extra People","Minimum Nights","Calendar Updated","Amenities", col("Review Scores Rating").alias("label"))

#data = csv.select("Host Response Time","Host Response Rate","Host Acceptance Rate","Host Listings Count","Host Total Listings Count","Property Type","Room Type","Price","Weekly Price","Monthly Price","Maximum Nights","Review Scores Accuracy","Review Scores Cleanliness","Review Scores Checkin","Review Scores Communication","Review Scores Location","Review Scores Value","Cancellation Policy","Calculated host listings count","Bedrooms","Bathrooms","Beds","Security Deposit","Cleaning Fee","Extra People","Minimum Nights","Calendar Updated","Amenities", col("Review Scores Rating").alias("label"))

data = csv.select("Host Response Time","Host Response Rate","Host Acceptance Rate","Property Type","Room Type","Price","Weekly Price","Monthly Price","Maximum Nights","Review Scores Accuracy","Review Scores Cleanliness","Review Scores Checkin","Review Scores Communication","Review Scores Location","Review Scores Value","Cancellation Policy","Bedrooms","Bathrooms","Beds","Extra People","Minimum Nights", col("Review Scores Rating").alias("label"))

data.show(5)

+------------------+------------------+--------------------+-------------+---------------+-----+------------+-------------+--------------+----------------------+-------------------------+---------------------+---------------------------+----------------------+-------------------+-------------------+--------+---------+----+------------+--------------+-----+
|Host Response Time|Host Response Rate|Host Acceptance Rate|Property Type|      Room Type|Price|Weekly Price|Monthly Price|Maximum Nights|Review Scores Accuracy|Review Scores Cleanliness|Review Scores Checkin|Review Scores Communication|Review Scores Location|Review Scores Value|Cancellation Policy|Bedrooms|Bathrooms|Beds|Extra People|Minimum Nights|label|
+------------------+------------------+--------------------+-------------+---------------+-----+------------+-------------+--------------+----------------------+-------------------------+---------------------+---------------------------+----------------------+-------------------+--

In [None]:
#data_clean = data.na.fill(value=0).na.fill("")
# jwoo
# data_clean = data.na.fill(value=0).na.fill("NA")
data_clean = data.na.fill(value=0)
data_clean.show(5)

+------------------+------------------+--------------------+-------------+---------------+-----+------------+-------------+--------------+----------------------+-------------------------+---------------------+---------------------------+----------------------+-------------------+-------------------+--------+---------+----+------------+--------------+-----+
|Host Response Time|Host Response Rate|Host Acceptance Rate|Property Type|      Room Type|Price|Weekly Price|Monthly Price|Maximum Nights|Review Scores Accuracy|Review Scores Cleanliness|Review Scores Checkin|Review Scores Communication|Review Scores Location|Review Scores Value|Cancellation Policy|Bedrooms|Bathrooms|Beds|Extra People|Minimum Nights|label|
+------------------+------------------+--------------------+-------------+---------------+-----+------------+-------------+--------------+----------------------+-------------------------+---------------------+---------------------------+----------------------+-------------------+--

In [None]:
# jwoo: add .setHandleInvalid("skip"): or "keep" for null value
stringindexer = StringIndexer(inputCol='Host Response Time', outputCol='Host_Response_index').setHandleInvalid("skip") #.fit(data_clean).transform(data_clean)
df1 =  stringindexer.fit(data_clean).transform(data_clean)

stringindexer2 = StringIndexer(inputCol='Cancellation Policy', outputCol='Cancellation_index').setHandleInvalid("skip") #.fit(data_clean).transform(data_clean)
df2 =  stringindexer2.fit(df1).transform(df1)

stringindexer3 = StringIndexer(inputCol='Property Type', outputCol='Property_Type_index').setHandleInvalid("skip") #.fit(data_clean).transform(data_clean)
df3 =  stringindexer3.fit(df2).transform(df2)

stringindexer4 = StringIndexer(inputCol='Room Type', outputCol='Room_Type_index').setHandleInvalid("skip") #.fit(data_clean).transform(data_clean)
df4 =  stringindexer4.fit(df3).transform(df3)

'''stringindexer3 = StringIndexer(inputCol='Bed Type', outputCol='Bed_Type_index').setHandleInvalid("skip") #.fit(data_clean).transform(data_clean)
df3 =  stringindexer3.fit(df2).transform(df2)
'''


'stringindexer3 = StringIndexer(inputCol=\'Bed Type\', outputCol=\'Bed_Type_index\').setHandleInvalid("skip") #.fit(data_clean).transform(data_clean)\ndf3 =  stringindexer3.fit(df2).transform(df2)\n'

In [None]:
df4.show(5)

+------------------+------------------+--------------------+-------------+---------------+-----+------------+-------------+--------------+----------------------+-------------------------+---------------------+---------------------------+----------------------+-------------------+-------------------+--------+---------+----+------------+--------------+-----+-------------------+------------------+-------------------+---------------+
|Host Response Time|Host Response Rate|Host Acceptance Rate|Property Type|      Room Type|Price|Weekly Price|Monthly Price|Maximum Nights|Review Scores Accuracy|Review Scores Cleanliness|Review Scores Checkin|Review Scores Communication|Review Scores Location|Review Scores Value|Cancellation Policy|Bedrooms|Bathrooms|Beds|Extra People|Minimum Nights|label|Host_Response_index|Cancellation_index|Property_Type_index|Room_Type_index|
+------------------+------------------+--------------------+-------------+---------------+-----+------------+-------------+---------

In [None]:
#jwoo
# final_df = df4
#cols = ["Host Response Time","Host Response Rate","Host Acceptance Rate","Property Type","Room Type","Price","Weekly Price","Monthly Price","Maximum Nights","Review Scores Accuracy","Review Scores Cleanliness","Review Scores Checkin","Review Scores Communication","Review Scores Location","Review Scores Value","Cancellation Policy","Bedrooms","Bathrooms","Beds","Extra People","Minimum Nights", "label", "Host_Response_index", "Cancellation_index", "Property_Type_index", "Room_Type_index"]
final_df = df4.drop('Host Response Time', "Property Type", "Room Type", "Cancellation Policy")
#final_df = df4.select(*cols)
final_df.show(5)

+------------------+--------------------+-----+------------+-------------+--------------+----------------------+-------------------------+---------------------+---------------------------+----------------------+-------------------+--------+---------+----+------------+--------------+-----+-------------------+------------------+-------------------+---------------+
|Host Response Rate|Host Acceptance Rate|Price|Weekly Price|Monthly Price|Maximum Nights|Review Scores Accuracy|Review Scores Cleanliness|Review Scores Checkin|Review Scores Communication|Review Scores Location|Review Scores Value|Bedrooms|Bathrooms|Beds|Extra People|Minimum Nights|label|Host_Response_index|Cancellation_index|Property_Type_index|Room_Type_index|
+------------------+--------------------+-----+------------+-------------+--------------+----------------------+-------------------------+---------------------+---------------------------+----------------------+-------------------+--------+---------+----+------------+--

In [None]:
final_df.printSchema()

root
 |-- Host Response Rate: integer (nullable = true)
 |-- Host Acceptance Rate: integer (nullable = true)
 |-- Price: integer (nullable = true)
 |-- Weekly Price: integer (nullable = true)
 |-- Monthly Price: integer (nullable = true)
 |-- Maximum Nights: integer (nullable = true)
 |-- Review Scores Accuracy: integer (nullable = true)
 |-- Review Scores Cleanliness: integer (nullable = true)
 |-- Review Scores Checkin: integer (nullable = true)
 |-- Review Scores Communication: integer (nullable = true)
 |-- Review Scores Location: integer (nullable = true)
 |-- Review Scores Value: integer (nullable = true)
 |-- Bedrooms: integer (nullable = true)
 |-- Bathrooms: integer (nullable = true)
 |-- Beds: integer (nullable = true)
 |-- Extra People: integer (nullable = true)
 |-- Minimum Nights: integer (nullable = true)
 |-- label: integer (nullable = false)
 |-- Host_Response_index: double (nullable = false)
 |-- Cancellation_index: double (nullable = false)
 |-- Property_Type_index: d

In [None]:

# Split the data
splits = final_df.randomSplit([0.7, 0.3])

# for decision tree classifier
train_df = splits[0]
test_df = splits[1].withColumnRenamed("label", "trueLabel")

print ("Training Rows:", train_df.count(), " Testing Rows:", test_df.count())

Training Rows: 23343  Testing Rows: 9971


In [None]:
train_df_pandas = train_df.toPandas()
test_df_pandas = test_df.toPandas()
print(train_df_pandas)


       Host Response Rate  Host Acceptance Rate  Price  Weekly Price  \
0                       0                     0      0             0   
1                       0                     0      0             0   
2                       0                     0      0             0   
3                       0                     0      0             0   
4                       0                     0      0             0   
...                   ...                   ...    ...           ...   
23338                 100                     0    850             0   
23339                 100                     0    850             0   
23340                 100                     0    850             0   
23341                 100                     0    895             0   
23342                 100                     0    975             0   

       Monthly Price  Maximum Nights  Review Scores Accuracy  \
0                  0               3                       0   
1      

In [None]:
# Split data into train_dataset and test_dataset
# train_df, test_df = random_split(data, [0.9, 0.1], 0)
# Convert spark dataframe into ray dataset

'''
train_dataset = ray.data.from_spark(train_df)
test_dataset = ray.data.from_spark(test_df)
'''
# Distributed Loading [7]
train_dataset = RayDMatrix(train_df, label="label", enable_categorical=True)
test_dataset = RayDMatrix(test_df, label='trueLabel', enable_categorical=True)


__ValueError__: Unknown data source type: <class 'list'> with FileType: None.
FIX THIS by passing a supported data type. Supported data types include pandas.DataFrame, pandas.Series, np.ndarray, and CSV/Parquet file paths. If you specify a file, path, consider passing the `filetype` argument to specify the type of the source. Use the `RayFileType` enum for that. If using Modin, Dask, or Petastorm, make sure the library is installed.
__error_

```
train_set = RayDMatrix(train_list, label='label', enable_categorical=True) # error: enable_categorical=True
test_set = RayDMatrix(test_list, label='trueLabel', enable_categorical=True)
```

In [None]:
# Then convert them into DMatrix used by xgboost
#  error: When
'''
(_RemoteRayXGBoostActor pid=10175) categorical type is supplied, DMatrix parameter `enable_categorical` must
(_RemoteRayXGBoostActor pid=10175) be set to `True`. Invalid columns:Host Response Time, Property Type, Room Type, Cancellation Policy
'''
'''
dtrain = RayDMatrix(train_dataset, label='label', enable_categorical=True) # error: enable_categorical=True
dtest = RayDMatrix(test_dataset, label='trueLabel', enable_categorical=True)
'''
# to follow the classifier example [1]
train_set = RayDMatrix(train_df_pandas, label='label', enable_categorical=True) # error: enable_categorical=True
test_set = RayDMatrix(test_df_pandas, label='trueLabel', enable_categorical=True)

'''
train_set = RayDMatrix(train_x, train_y, enable_categorical=True)
test_set = RayDMatrix(test_x, test_y, enable_categorical=True)'''

'\ntrain_set = RayDMatrix(train_x, train_y, enable_categorical=True)\ntest_set = RayDMatrix(test_x, test_y, enable_categorical=True)'

In [None]:
end=time()
print('{} takes {} seconds'.format(phrase, (end - start))) #round(end - start, 2)))

data engineering time:  takes 41.440245151519775 seconds


#### ValueError: DataFrame.dtypes for data must be int, float, bool or category.  
When
(RemoteRayXGBoostActor pid=1198) categorical type is supplied, DMatrix parameter `enable_categorical` must
(RemoteRayXGBoostActor pid=1198) be set to `True`. Invalid columns:__Host Response Time, Property Type, Room Type, Cancellation Policy__


In [None]:
evals_result = {}

'''
bst = train(
    {
        "objective": "binary:logistic",
        "eval_metric": ["logloss", "error"], #, "rmse", "r2"],
    },
    train_set,
    evals_result=evals_result,
    evals=[(train_set, "train"), (test_set, "test")],
    verbose_eval=False,
    ray_params=RayParams(
        num_actors=2,  # Number of remote actors
        cpus_per_actor=1))
'''

'\nbst = train(\n    {\n        "objective": "binary:logistic",\n        "eval_metric": ["logloss", "error"], #, "rmse", "r2"],\n    },\n    train_set,\n    evals_result=evals_result,\n    evals=[(train_set, "train"), (test_set, "test")],\n    verbose_eval=False,\n    ray_params=RayParams(\n        num_actors=2,  # Number of remote actors\n        cpus_per_actor=1))\n'

In [None]:
bst = with_benchmark('training time: ', lambda: train(
    {
        #"tree_method": "gpu_hist",
        "objective": "binary:logistic",
        "eval_metric": ["logloss", "error"], #, "rmse", "r2"],
    },
    train_set,
    evals_result=evals_result,
    evals=[(train_set, "train"), (test_set, "test")],
    verbose_eval=False,
    ray_params=RayParams(
        num_actors=3,  # Number of remote actors
        cpus_per_actor=1)) # gpus_per_actor=1)) # cpus_per_actor=1))
    )


TimeoutError: Placement group creation timed out. Make sure your cluster either has enough resources or use an autoscaling cluster. Current resources available: {'memory': 7393946830.0, 'node:172.30.0.247': 0.98, 'CPU': 3.0, 'object_store_memory': 4766436472.0}, resources requested by the placement group: [{'CPU': 2.0}, {'CPU': 2.0}, {'CPU': 2.0}]

In [None]:
# bst.save_model("air_model.xgb")
print("Final training error: {:.4f}".format(
    evals_result["train"]["error"][-1]))

In [None]:
# bst.save_model("air_model.xgb")
print("Final testing error: {:.4f}".format(
    evals_result["test"]["error"][-1]))

In [None]:
from xgboost_ray import predict

# bst = xgb.Booster(model_file="air_model.xgb")
pred_ray = predict(bst, test_set, ray_params=RayParams(num_actors=2,  # Number of remote actors
        cpus_per_actor=1))

print(pred_ray)

In [None]:
raydp.stop_spark()
ray.shutdown()

File Size, Partition, no_executors, cores/exe, mem/exe, max workers, Train (s), S3 Read, Data Eng
Experiment 1: 139.1MB, 30, 1, 1 , "2GB", 3, 3.75 (hist), 9.82, 41.4


### References
1. Ray Breast Cancer Example, https://github.com/ray-project/xgboost_ray 
1. Data Read API, https://docs.ray.io/en/releases-1.9.1/_modules/ray/data/read_api.html
1. actors, https://docs.ray.io/en/latest/ray-core/actors.html
1. https://docs.ray.io/en/ray-1.1.0/xgboost-ray.html
1. https://sparkbyexamples.com/pyspark/convert-pyspark-dataframe-to-pandas/
1. https://sparkbyexamples.com/pandas/pandas-select-columns-by-name-or-index/
1. https://github.com/ray-project/xgboost_ray#distributed-data-loading
1. https://www.anyscale.com/blog/building-an-end-to-end-ml-pipeline-using-mars-and-xgboost-on-ray
1. Evaluation Example in Dask, https://docs.ray.io/en/latest/ray-core/examples/dask_xgboost/dask_xgboost.html
1. Read a CSV or other delimited file with Arrow, https://arrow.apache.org/docs/r/reference/read_delim_arrow.html