In [1]:
!pip install ray pandas kaggle scikit-learn nltk lightgbm pyngrok tensorboardx --quiet

In [2]:
import os
from ray.air.config import ScalingConfig
from ray.train.lightgbm import LightGBMTrainer
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score
from pyngrok import ngrok
import time
import pandas as pd
import numpy as np
import ray
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize

nltk.download("stopwords")
nltk.download("punkt")
nltk.download("punkt_tab")
stop_words = set(stopwords.words("english"))

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Error loading stop_words: Package 'stop_words' not found
[nltk_data]     in index
[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package punkt_tab to /root/nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!


In [3]:
ray.init(ignore_reinit_error=True, num_cpus=32)

2025-03-04 13:56:37,137	INFO worker.py:1841 -- Started a local Ray instance.


0,1
Python version:,3.11.11
Ray version:,2.43.0


In [4]:
def download_and_extract(dataset, file_name):
    download_path = "./downloads"
    zip_file_path = file_name + ".zip"
    command = f"kaggle datasets download {dataset} --file {file_name}"
    os.system(command)

    os.system(f"unzip {zip_file_path} -d {download_path}")
    extracted_file_path = os.path.join(download_path, file_name)

    return os.path.join(download_path, file_name)

@ray.remote
def clean_and_label_chunk(df_chunk):
    df_chunk["review_body"] = (
        df_chunk["review_body"]
        .fillna("")
        .str.lower()
        .str.replace(r"http\S+|www\S+|https\S+", "", regex=True)
        .str.replace(r"[^a-zA-Z\s]", "", regex=True)
        .str.replace(r"\s+", " ", regex=True)
        .str.strip()
    )
    df_chunk["star_rating"] = pd.to_numeric(df_chunk["star_rating"], errors="coerce")
    df_chunk["sentiment"] = df_chunk["star_rating"].apply(lambda x: 1 if x > 3 else 0)
    return df_chunk

@ray.remote
def tokenize_and_filter_chunk(df_chunk):
    df_chunk["tokens"] = df_chunk["review_body"].fillna("").apply(
        lambda text: [
            word for word in word_tokenize(text)
            if word.isalpha() and word not in stop_words and len(word) > 2
        ]
    )
    return df_chunk

@ray.remote
def apply_tfidf_chunk(df_chunk, num_features):
    df_chunk["joined_tokens"] = df_chunk["tokens"].apply(lambda x: " ".join(x))
    vectorizer = TfidfVectorizer(max_features=num_features)
    tfidf_matrix = vectorizer.fit_transform(df_chunk["joined_tokens"])
    features = tfidf_matrix.toarray()
    feature_df = pd.DataFrame(features, columns=[f"feature_{i}" for i in range(features.shape[1])])
    feature_df["sentiment"] = df_chunk["sentiment"].values
    return feature_df

In [6]:
dataset = "cynthiarempel/amazon-us-customer-reviews-dataset"
file_name = "amazon_reviews_us_Wireless_v1_00.tsv"

file_path = download_and_extract(dataset, file_name)

# 2. Load dataset
df = pd.read_csv(
    file_path,
    sep="\t",
    dtype={"star_rating": "float32"},
    on_bad_lines="skip",
    engine="python"
)

In [None]:
#!ngrok authtoken  xxxxxxxx
#dashboard_url = ray._private.worker._global_node.webui_url
#public_url = ngrok.connect(dashboard_url.split(":")[-1], "http")
#print(f"Ray Dashboard: {public_url}")

In [6]:
df_2 = df.sample(frac=0.3, random_state=42)

In [9]:

NUM_CHUNKS = 32
NUM_FEATURES = 300
start_time = time.time()
df_chunks = np.array_split(df, NUM_CHUNKS)

cleaned_chunks = ray.get([clean_and_label_chunk.remote(chunk) for chunk in df_chunks])
df_clean = pd.concat(cleaned_chunks)

tokenized_chunks = ray.get([tokenize_and_filter_chunk.remote(chunk) for chunk in np.array_split(df_clean, NUM_CHUNKS)])
df_clean = pd.concat(tokenized_chunks)

tfidf_chunks = ray.get([apply_tfidf_chunk.remote(chunk, NUM_FEATURES) for chunk in np.array_split(df_clean, NUM_CHUNKS)])
final_df = pd.concat(tfidf_chunks)



#start_time = time.time()
#tfidf_chunks_ref = apply_tfidf_chunk.remote(
#    tokenize_and_filter_chunk.remote(
#        clean_and_label_chunk.remote(df)
#    ),
#    300
#)

#final_df = ray.get(tfidf_chunks_ref)
# Record the end time
end_time = time.time()

# Calculate execution time
execution_time = end_time - start_time

print(f"Execution time: {execution_time} seconds")

  return bound(*args, **kwds)
  return bound(*args, **kwds)
  return bound(*args, **kwds)


Execution time: 112.30492329597473 seconds


In [10]:
X = final_df.drop(columns=["sentiment"])
y = final_df["sentiment"]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [24]:
train_data = pd.concat([X_train, y_train], axis=1)
test_data = pd.concat([X_test, y_test], axis=1)

In [26]:
train_dataset = ray.data.from_pandas(train_data)
test_dataset = ray.data.from_pandas(test_data)

In [28]:
start_time = time.time()
# Train LightGBM with Ray
trainer = LightGBMTrainer(
    scaling_config=ScalingConfig(num_workers=8),  # Adjust based CPUs
    label_column="sentiment",
    num_boost_round=50,
    params={
        "objective": "binary",
        "metric": ["auc"],
        "boosting_type": "gbdt",
        "num_leaves": 31,
        "learning_rate": 0.1,
        "force_row_wise": True,
        "feature_fraction": 0.9,
    },
    datasets={"train": train_dataset, "valid": test_dataset},
)

result = trainer.fit()
end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {execution_time} seconds")




View detailed results here: /root/ray_results/LightGBMTrainer_2025-03-04_14-22-52

Training started without custom configuration.


[36m(LightGBMTrainer pid=31066)[0m Started distributed worker processes: 
[36m(LightGBMTrainer pid=31066)[0m - (node_id=cb53d36791200c88247391344b18c63c092fb58a9a2f4b2c74bec91b, ip=172.28.0.12, pid=31164) world_rank=0, local_rank=0, node_rank=0
[36m(LightGBMTrainer pid=31066)[0m - (node_id=cb53d36791200c88247391344b18c63c092fb58a9a2f4b2c74bec91b, ip=172.28.0.12, pid=31166) world_rank=1, local_rank=1, node_rank=0
[36m(LightGBMTrainer pid=31066)[0m - (node_id=cb53d36791200c88247391344b18c63c092fb58a9a2f4b2c74bec91b, ip=172.28.0.12, pid=31165) world_rank=2, local_rank=2, node_rank=0
[36m(LightGBMTrainer pid=31066)[0m - (node_id=cb53d36791200c88247391344b18c63c092fb58a9a2f4b2c74bec91b, ip=172.28.0.12, pid=31168) world_rank=3, local_rank=3, node_rank=0
[36m(LightGBMTrainer pid=31066)[0m - (node_id=cb53d36791200c88247391344b18c63c092fb58a9a2f4b2c74bec91b, ip=172.28.0.12, pid=31169) world_rank=4, local_rank=4, node_rank=0
[36m(LightGBMTrainer pid=31066)[0m - (node_id=cb53d367912

[2m[36m(pid=31832) [0mRunning 0: 0.00 row [00:00, ? row/s]

[2m[36m(pid=31832) [0m- split(8, equal=True) 1: 0.00 row [00:00, ? row/s]

[36m(SplitCoordinator pid=31832)[0m Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-03-04_13-56-35_704445_15922/logs/ray-data
[36m(SplitCoordinator pid=31832)[0m Execution plan of Dataset: InputDataBuffer[Input] -> OutputSplitter[split(8, equal=True)]


[2m[36m(pid=31833) [0mRunning 0: 0.00 row [00:00, ? row/s]

[2m[36m(pid=31833) [0m- split(8, equal=True) 1: 0.00 row [00:00, ? row/s]

[36m(SplitCoordinator pid=31833)[0m Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-03-04_13-56-35_704445_15922/logs/ray-data
[36m(SplitCoordinator pid=31833)[0m Execution plan of Dataset: InputDataBuffer[Input] -> OutputSplitter[split(8, equal=True)]


[36m(RayTrainWorker pid=31164)[0m [LightGBM] [Info] Trying to bind port 55319...
[36m(RayTrainWorker pid=31164)[0m [LightGBM] [Info] Binding port 55319 succeeded
[36m(RayTrainWorker pid=31164)[0m [LightGBM] [Info] Listening...
[36m(RayTrainWorker pid=31166)[0m [LightGBM] [Info] Trying to bind port 45565...
[36m(RayTrainWorker pid=31166)[0m [LightGBM] [Info] Binding port 45565 succeeded
[36m(RayTrainWorker pid=31166)[0m [LightGBM] [Info] Listening...
[36m(RayTrainWorker pid=31165)[0m [LightGBM] [Info] Trying to bind port 33879...
[36m(RayTrainWorker pid=31165)[0m [LightGBM] [Info] Binding port 33879 succeeded
[36m(RayTrainWorker pid=31165)[0m [LightGBM] [Info] Listening...
[36m(RayTrainWorker pid=31169)[0m [LightGBM] [Info] Trying to bind port 41209...[32m [repeated 2x across cluster][0m
[36m(RayTrainWorker pid=31169)[0m [LightGBM] [Info] Binding port 41209 succeeded[32m [repeated 2x across cluster][0m
[36m(RayTrainWorker pid=31169)[0m [LightGBM] [Info] Liste

[36m(RayTrainWorker pid=31164)[0m Checkpoint successfully created at: Checkpoint(filesystem=local, path=/root/ray_results/LightGBMTrainer_2025-03-04_14-22-52/LightGBMTrainer_2894d_00000_0_2025-03-04_14-22-52/checkpoint_000000)
2025-03-04 14:24:32,133	INFO tune.py:1009 -- Wrote the latest version of all result files and experiment state to '/root/ray_results/LightGBMTrainer_2025-03-04_14-22-52' in 0.0054s.



Training completed after 50 iterations at 2025-03-04 14:24:32. Total running time: 1min 39s

Execution time: 99.36918139457703 seconds
