In [1]:
# import time
import random
import pandas as pd
import numpy as np
# from tqdm import tqdm
# import multiprocessing
# from itertools import repeat, product
# import matplotlib.pyplot as plt


# import xgboost as xgb
from sklearn.metrics import f1_score
from sklearn.pipeline import Pipeline
from sklearn.datasets import fetch_20newsgroups
# from sklearn.model_selection import train_test_split
# from sklearn.feature_extraction.text import CountVectorizer

random.seed(42)
# %matplotlib inline

# Load the data

In [2]:
twenty_train = fetch_20newsgroups(subset='train', shuffle=True, random_state=42)

In [3]:
len(twenty_train.data)

11314

In [4]:
twenty_train.target_names

['alt.atheism',
 'comp.graphics',
 'comp.os.ms-windows.misc',
 'comp.sys.ibm.pc.hardware',
 'comp.sys.mac.hardware',
 'comp.windows.x',
 'misc.forsale',
 'rec.autos',
 'rec.motorcycles',
 'rec.sport.baseball',
 'rec.sport.hockey',
 'sci.crypt',
 'sci.electronics',
 'sci.med',
 'sci.space',
 'soc.religion.christian',
 'talk.politics.guns',
 'talk.politics.mideast',
 'talk.politics.misc',
 'talk.religion.misc']

# modeling

### sklearn with Ray backend

In [None]:
import ray
import joblib
from ray.util.joblib import register_ray

In [None]:
X, y = twenty_train.data, twenty_train.target

In [None]:
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42)

In [None]:
%time
register_ray()

In [None]:
num_exp = 50
# since these two were the fastest when using multi-processing
t = "exact"
n = -1

all_exps = []


exp_time = []
exp_score = []
experiment_dict ={}
experiment_dict["method"] = f"ray_backend n_job={n}, tree_method={t}"   

print(f"n_job={n}, tree_method={t} x {num_exp} times")

for i in tqdm(repeat(1, num_exp), total=num_exp):

    xgb_model = xgb.XGBClassifier(n_jobs=n,
                                  tree_method=t,
                                  n_estimators=100, 
                                  random_state=42
                                 )


    text_clf = Pipeline([
        ('vect', CountVectorizer(lowercase=False, ngram_range=(1,2))),
        ('clf', xgb_model)
    ])

    with joblib.parallel_backend('ray'):        
        start = time.time()
        text_clf.fit(X_train, y_train)
        end = time.time()
    exp_time.append(end-start)

    y_pred = text_clf.predict(X_test)
    score = f1_score(y_test, y_pred, average='macro')
    exp_score.append(score)
    # print(score)

experiment_dict["time_result"] = exp_time
experiment_dict["average_score"] = exp_score

all_exps.append(experiment_dict)

# Ray AI Runtimr (AIR)

In [None]:
import ray
from ray.air.config import ScalingConfig
from ray.train.xgboost import XGBoostTrainer
from ray.data.preprocessors import CountVectorizer


In [None]:
# Create dask dataframe 
df = pd.DataFrame({"text": twenty_train.data[:100], "target": twenty_train.target[:100]})

In [None]:
ds = ray.data.from_pandas(df)

In [None]:
ds.show(2)

In [None]:
# Split data into train and validation.
train_dataset, valid_dataset = ds.train_test_split(test_size=0.2, seed=42)

In [None]:
train_dataset

In [None]:
preprocessor = CountVectorizer(columns=["text"])

In [None]:
# XGBoost specific params
params = {
    "tree_method": "approx",
    "objective": "multi:softmax",
    "eval_metric": ["merror"],
    "num_class": df['target'].nunique()
}



trainer = XGBoostTrainer(
    scaling_config=ScalingConfig(num_workers=2, use_gpu=False),
    label_column="target",
    params=params,
    datasets={"train": train_dataset, "valid": valid_dataset},
    preprocessor=preprocessor,
    num_boost_round=2,
)

In [None]:
result = trainer.fit()
print(result.metrics)

In [None]:
ray.__version__

# XGB with Dask

https://xgboost.readthedocs.io/en/stable/tutorials/dask.html

https://examples.dask.org/machine-learning/text-vectorization.html

https://examples.dask.org/machine-learning/xgboost.html

In [5]:
import dask_xgboost

import dask.dataframe as dd
from dask.distributed import Client
import dask_ml.feature_extraction.text
from dask_ml.model_selection import train_test_split





In [6]:
client = Client(n_workers=8, threads_per_worker=1, memory_limit='4GB')
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 8
Total threads: 8,Total memory: 29.80 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:54813,Workers: 8
Dashboard: http://127.0.0.1:8787/status,Total threads: 8
Started: Just now,Total memory: 29.80 GiB

0,1
Comm: tcp://127.0.0.1:54832,Total threads: 1
Dashboard: http://127.0.0.1:54834/status,Memory: 3.73 GiB
Nanny: tcp://127.0.0.1:54816,
Local directory: /var/folders/b0/dcyq0pv101z20t4wq5dtl9qc0000gn/T/dask-worker-space/worker-9o1uu64e,Local directory: /var/folders/b0/dcyq0pv101z20t4wq5dtl9qc0000gn/T/dask-worker-space/worker-9o1uu64e

0,1
Comm: tcp://127.0.0.1:54833,Total threads: 1
Dashboard: http://127.0.0.1:54839/status,Memory: 3.73 GiB
Nanny: tcp://127.0.0.1:54817,
Local directory: /var/folders/b0/dcyq0pv101z20t4wq5dtl9qc0000gn/T/dask-worker-space/worker-2436yz95,Local directory: /var/folders/b0/dcyq0pv101z20t4wq5dtl9qc0000gn/T/dask-worker-space/worker-2436yz95

0,1
Comm: tcp://127.0.0.1:54841,Total threads: 1
Dashboard: http://127.0.0.1:54844/status,Memory: 3.73 GiB
Nanny: tcp://127.0.0.1:54818,
Local directory: /var/folders/b0/dcyq0pv101z20t4wq5dtl9qc0000gn/T/dask-worker-space/worker-fgxl8sz0,Local directory: /var/folders/b0/dcyq0pv101z20t4wq5dtl9qc0000gn/T/dask-worker-space/worker-fgxl8sz0

0,1
Comm: tcp://127.0.0.1:54836,Total threads: 1
Dashboard: http://127.0.0.1:54837/status,Memory: 3.73 GiB
Nanny: tcp://127.0.0.1:54819,
Local directory: /var/folders/b0/dcyq0pv101z20t4wq5dtl9qc0000gn/T/dask-worker-space/worker-vr8_owsj,Local directory: /var/folders/b0/dcyq0pv101z20t4wq5dtl9qc0000gn/T/dask-worker-space/worker-vr8_owsj

0,1
Comm: tcp://127.0.0.1:54843,Total threads: 1
Dashboard: http://127.0.0.1:54849/status,Memory: 3.73 GiB
Nanny: tcp://127.0.0.1:54820,
Local directory: /var/folders/b0/dcyq0pv101z20t4wq5dtl9qc0000gn/T/dask-worker-space/worker-ukpwx5no,Local directory: /var/folders/b0/dcyq0pv101z20t4wq5dtl9qc0000gn/T/dask-worker-space/worker-ukpwx5no

0,1
Comm: tcp://127.0.0.1:54842,Total threads: 1
Dashboard: http://127.0.0.1:54846/status,Memory: 3.73 GiB
Nanny: tcp://127.0.0.1:54821,
Local directory: /var/folders/b0/dcyq0pv101z20t4wq5dtl9qc0000gn/T/dask-worker-space/worker-3v88hble,Local directory: /var/folders/b0/dcyq0pv101z20t4wq5dtl9qc0000gn/T/dask-worker-space/worker-3v88hble

0,1
Comm: tcp://127.0.0.1:54847,Total threads: 1
Dashboard: http://127.0.0.1:54851/status,Memory: 3.73 GiB
Nanny: tcp://127.0.0.1:54822,
Local directory: /var/folders/b0/dcyq0pv101z20t4wq5dtl9qc0000gn/T/dask-worker-space/worker-ijveiegc,Local directory: /var/folders/b0/dcyq0pv101z20t4wq5dtl9qc0000gn/T/dask-worker-space/worker-ijveiegc

0,1
Comm: tcp://127.0.0.1:54853,Total threads: 1
Dashboard: http://127.0.0.1:54854/status,Memory: 3.73 GiB
Nanny: tcp://127.0.0.1:54823,
Local directory: /var/folders/b0/dcyq0pv101z20t4wq5dtl9qc0000gn/T/dask-worker-space/worker-ab9q0da_,Local directory: /var/folders/b0/dcyq0pv101z20t4wq5dtl9qc0000gn/T/dask-worker-space/worker-ab9q0da_


In [7]:
df = dd.from_pandas(pd.DataFrame({"text": twenty_train.data[:100],
                                  "target": twenty_train.target[:100]}),
                    npartitions=25)

In [8]:
vect = dask_ml.feature_extraction.text.HashingVectorizer()

In [9]:
y = df['target'].to_dask_array(lengths=True)

In [10]:
X = vect.fit_transform(df['text'])

In [11]:
X.compute_chunk_sizes()
y.compute_chunk_sizes()

Unnamed: 0,Array,Chunk
Bytes,800 B,32 B
Shape,"(100,)","(4,)"
Dask graph,25 chunks in 3 graph layers,25 chunks in 3 graph layers
Data type,int64 numpy.ndarray,int64 numpy.ndarray
"Array Chunk Bytes 800 B 32 B Shape (100,) (4,) Dask graph 25 chunks in 3 graph layers Data type int64 numpy.ndarray",100  1,

Unnamed: 0,Array,Chunk
Bytes,800 B,32 B
Shape,"(100,)","(4,)"
Dask graph,25 chunks in 3 graph layers,25 chunks in 3 graph layers
Data type,int64 numpy.ndarray,int64 numpy.ndarray


In [12]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

In [None]:
params = {'objective': 'multi:softmax',
          'num_class':20}

bst = dask_xgboost.train(client, params, X_train, y_train, num_boost_round=100)

Exception in thread Thread-6:
Traceback (most recent call last):
  File "/opt/anaconda3/envs/dask/lib/python3.9/threading.py", line 980, in _bootstrap_inner
    self.run()
  File "/opt/anaconda3/envs/dask/lib/python3.9/threading.py", line 917, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/anaconda3/envs/dask/lib/python3.9/site-packages/dask_xgboost/tracker.py", line 365, in join
    while self.thread.isAlive():
AttributeError: 'Thread' object has no attribute 'isAlive'
[14:14:08] task NULL got new rank 0
[14:14:08] task NULL got new rank 1
[14:14:08] task NULL got new rank 2
[14:14:08] task NULL got new rank 3
[14:14:08] task NULL got new rank 4
[14:14:08] task NULL got new rank 5
[14:14:08] task NULL got new rank 6
[14:14:08] task NULL got new rank 7


In [None]:
y_hat = dask_xgboost.predict(client, bst, X_test).persist()
y_hat

In [None]:
y_hat.compute()

In [None]:
y_test, y_hat = dask.compute(y_test, y_hat)

In [None]:
score = f1_score(y_test, y_hat, average='macro')

# Visualize result

In [None]:
df_result = pd.DataFrame(all_exps).set_index("method")

In [None]:
df_result = df_result.reset_index()

In [None]:
# df_old = pd.read_csv('../output/df_result.csv')

# df_old['time_result'] = df_old['time_result'].apply(literal_eval)

# df_result = pd.concat([df_old, df_result])

In [None]:
df_result['time_result_avg'] = df_result['time_result'].apply(lambda x: np.mean(x))

In [None]:
df_result = df_result.sort_values(by="time_result_avg")

In [None]:
df_result.to_csv('../output/df_result_joblib_ray_29012023.csv')

In [None]:
df_result.head(2)

In [None]:
218.70/233.83

In [None]:
fig = plt.figure(figsize=(10, 10))
plt.title(f'XGB performance benchmark for {num_exp} iteration (20 Newsgroup data)')
plt.ylabel("elapsed time (sec)")
fig = df_result["time_result"].head(2).apply(lambda x: pd.Series(x)).T.boxplot(rot=45)
plt.tight_layout()
fig.figure.savefig(f'../img/performance_{num_exp}_joblib_ray.png')


In [None]:
fig = plt.figure(figsize=(10, 10))
plt.title(f'XGB average time for {num_exp} (20 Newsgroup data)')
plt.ylabel("elapsed time (sec)")

df_result_t = df_result["time_result"].head(2).apply(lambda x: pd.Series(x)).T
fig = df_result_t.reindex(df_result_t.mean().sort_values().index, axis=1).mean().plot(kind='bar', rot=45)

plt.tight_layout()
fig.figure.savefig(f'../img/average_time_{num_exp}.png')



In [None]:
# ray https://www.anyscale.com/blog/three-ways-to-speed-up-xgboost-model-training